package com.java110.databus.kafka;

import com.alibaba.fastjson.JSONObject;
import com.java110.core.base.controller.BaseController;
import com.java110.core.factory.AuthenticationFactory;
import com.java110.databus.reportData.IReportDataAdapt;
import com.java110.dto.govCommunity.GovCommunityDto;
import com.java110.dto.reportData.ReportDataDto;
import com.java110.dto.reportData.ReportDataHeaderDto;
import com.java110.intf.assets.IGovCommunityInnerServiceSMO;
import com.java110.utils.factory.ApplicationContextFactory;
import com.java110.utils.kafka.KafkaFactory;
import com.java110.utils.util.Assert;
import com.java110.utils.util.BeanConvertUtil;
import com.java110.utils.util.DateUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;

import java.util.List;

/**
 * kafka侦听
 * Created by wuxw on 2018/4/15.
 */
public class DatabusServiceKafka extends BaseController {

    private final static Logger logger = LoggerFactory.getLogger(DatabusServiceKafka.class);
    @Autowired
    private IGovCommunityInnerServiceSMO govCommunityInnerServiceSMOImpl;

    @KafkaListener(topics = {"hcGov"})
    public void listen(ConsumerRecord<?, ?> record) {
        logger.info("kafka的value: " + record.value().toString());
        String orderInfo = record.value().toString();
        //业务处理
        doListen(orderInfo);
    }

    private void doListen(String orderInfo) {
        ReportDataDto reportDataDto = null;
        JSONObject returnJson = new JSONObject();
        JSONObject reqJson = JSONObject.parseObject(orderInfo);

        JSONObject header = reqJson.getJSONObject("header");

        String communityId = header.getString("extCommunityId");
        //签名校验
        GovCommunityDto govCommunity = null;
        try {
            //获得小区对象
            govCommunity = getExtCommunityCode(communityId);
            //报文合规性校验
            preValiateOrderInfo(orderInfo);
            //构建对象
            reportDataDto = freshReportDataDto(orderInfo);
            //签名认证
            AuthenticationFactory.authReportDataSign(reportDataDto, govCommunity.getCommunitySecure());
            //适配器
            IReportDataAdapt reportDataAdapt = ApplicationContextFactory.getBean(reportDataDto.getReportDataHeaderDto().getServiceCode(), IReportDataAdapt.class);
            if (reportDataAdapt == null) {
                throw new IllegalArgumentException("serviceCode 错误 请检查");
            }
            //业务处理
            reportDataDto = reportDataAdapt.report(reportDataDto, govCommunity);
            //回写sign
            AuthenticationFactory.generatorReportDataSign(reportDataDto, govCommunity.getCommunitySecure());

        } catch (Exception e) {
            logger.error("处理政务数据上报失败", e);
            String tranId = "";
            String serviceCode = "";
            if (header.containsKey("serviceCode")) {
                tranId = reqJson.getJSONObject("header").getString("tranId");
                serviceCode = reqJson.getJSONObject("header").getString("serviceCode");
            }
            AuthenticationFactory.generatorReportDataSign(reportDataDto, govCommunity == null ? communityId : govCommunity.getCommunitySecure());
            reportDataDto = createReportDataDto(ReportDataHeaderDto.CODE_ERROR, e.getLocalizedMessage(), tranId, serviceCode,reportDataDto);
        } finally {
            //回写返回结果信息
            try {
                returnJson.put("header", reportDataDto.getReportDataHeaderDto());
                returnJson.put("body", reportDataDto.getReportDataBodyDto());
                logger.info("返回kafka消息{}", returnJson.toJSONString());
                KafkaFactory.sendKafkaMessage(govCommunity.getTopic(), returnJson.toJSONString());
            } catch (Exception e) {
                logger.error("发送kafka 消息失败", e);
            }
        }
    }

    /**
     * 获取小区密钥
     *
     * @param communityId
     * @return
     */
    private GovCommunityDto getExtCommunityCode(String communityId) {
        GovCommunityDto govCommunityDto = new GovCommunityDto();
        govCommunityDto.setGovCommunityId(communityId);
        List<GovCommunityDto> govCommunityDtos = govCommunityInnerServiceSMOImpl.queryGovCommunitys(govCommunityDto);
        if (govCommunityDtos == null || govCommunityDtos.size() < 1) {
            throw new IllegalArgumentException("政务系统未查到对应小区信息");
        }
        return govCommunityDtos.get(0);
    }


    /**
     * 构建对象
     *
     * @param orderInfo
     * @return
     */
    private ReportDataDto freshReportDataDto(String orderInfo) {
        ReportDataDto reportDataDto = new ReportDataDto();
        JSONObject reqJson = JSONObject.parseObject(orderInfo);
        ReportDataHeaderDto reportDataHeaderDto = BeanConvertUtil.covertBean(reqJson.getJSONObject("header"), ReportDataHeaderDto.class);
        reportDataDto.setReportDataHeaderDto(reportDataHeaderDto);
        reportDataDto.setReportDataBodyDto(reqJson.getJSONObject("body"));

        return reportDataDto;
    }

    private ReportDataDto createReportDataDto(int code, String msg, String tranId, String serviceCode,ReportDataDto reportData) {
        ReportDataDto reportDataDto = new ReportDataDto();
        ReportDataHeaderDto reportDataHeaderDto = new ReportDataHeaderDto();
        reportDataHeaderDto.setTranId(tranId);
        reportDataHeaderDto.setResTime(DateUtil.getNow(DateUtil.DATE_FORMATE_STRING_DEFAULT));
        reportDataHeaderDto.setCode(code);
        reportDataHeaderDto.setMsg(msg);
        reportDataHeaderDto.setServiceCode(serviceCode);
        reportDataDto.setReportDataHeaderDto(reportDataHeaderDto);
        reportDataDto.setReportDataBodyDto(reportData.getReportDataBodyDto());
        return reportDataDto;
    }


    /**
     * 这里预校验，请求报文中不能有 dataFlowId
     *
     * @param orderInfo
     */
    private void preValiateOrderInfo(String orderInfo) {
        JSONObject reqJson = JSONObject.parseObject(orderInfo);
        Assert.hasKeyAndValue(reqJson, "header", "请求报文中未包含header");
        Assert.hasKeyAndValue(reqJson, "body", "请求报文中未包含body");
        JSONObject header = reqJson.getJSONObject("header");
        Assert.hasKeyAndValue(header, "serviceCode", "请求报文中未包含serviceCode");
        Assert.hasKeyAndValue(header, "sign", "请求报文中未包含sign");
        Assert.hasKeyAndValue(header, "reqTime", "请求报文中未包含reqTime");
        Assert.hasKeyAndValue(header, "extCommunityId", "请求报文中未包含extCommunityId");
    }

}
